PydanticAI 与 LangGraph 协同集成
本章探讨如何让 PydanticAI 和 LangGraph 协同工作,发挥各自优势。
本章概览
本章将介绍:
- 为什么需要协同集成
- 四种集成模式
- 完整的集成示例
- 最佳实践和注意事项
1. 为什么需要协同
1.1 互补优势
┌─────────────────────────────────────────────────────────────────┐
│ 协同集成价值 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ PydanticAI 贡献: LangGraph 贡献: │
│ ───────────────── ───────────────── │
│ │
│ ✓ 类型安全的输出 ✓ 复杂流程编排 │
│ ✓ 优雅的依赖注入 ✓ 精确的状态管理 │
│ ✓ 简洁的 Agent 定义 ✓ 多 Agent 协作 │
│ ✓ 内置验证和重试 ✓ 检查点和恢复 │
│ ✓ Logfire 可观测 ✓ 条件路由控制 │
│ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 协同集成系统 │ │
│ │ │ │
│ │ 最佳开发体验 │ │
│ │ + │ │
│ │ 最强编排能力 │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 典型场景
| 场景 | PydanticAI 角色 | LangGraph 角色 |
|---|---|---|
| 多专家系统 | 定义各专家 Agent | 编排专家协作流程 |
| RAG 系统 | 处理检索结果 | 管理检索-生成流程 |
| 工作流自动化 | 执行具体任务 | 定义工作流状态机 |
| 审批系统 | 生成审批意见 | 管理审批流程 |
2. 四种集成模式
2.1 模式一:PydanticAI Agent 作为 LangGraph 节点
最常用的模式:将 PydanticAI Agent 包装为 LangGraph 节点。
python
from langgraph.graph import StateGraph, START, END
from pydantic_ai import Agent
from pydantic import BaseModel
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
# ========== PydanticAI 定义 ==========
class AnalysisResult(BaseModel):
summary: str
sentiment: str
confidence: float
analyst_agent = Agent(
'openai:gpt-4o',
output_type=AnalysisResult,
instructions='你是一位专业分析师,分析用户提供的内容。'
)
class WritingResult(BaseModel):
title: str
content: str
writer_agent = Agent(
'openai:gpt-4o',
output_type=WritingResult,
instructions='你是一位专业作家,根据分析结果撰写文章。'
)
# ========== LangGraph 状态 ==========
class WorkflowState(TypedDict):
input_text: str
analysis: AnalysisResult | None
article: WritingResult | None
messages: Annotated[list, add_messages]
# ========== 包装为节点 ==========
async def analyze_node(state: WorkflowState) -> dict:
"""分析节点:使用 PydanticAI Agent"""
result = await analyst_agent.run(state["input_text"])
return {
"analysis": result.output,
"messages": [("assistant", f"分析完成: {result.output.summary}")]
}
async def write_node(state: WorkflowState) -> dict:
"""写作节点:使用 PydanticAI Agent"""
prompt = f"""
基于以下分析撰写文章:
摘要:{state["analysis"].summary}
情感:{state["analysis"].sentiment}
"""
result = await writer_agent.run(prompt)
return {
"article": result.output,
"messages": [("assistant", f"文章完成: {result.output.title}")]
}
# ========== 构建图 ==========
graph = StateGraph(WorkflowState)
graph.add_node("analyze", analyze_node)
graph.add_node("write", write_node)
graph.add_edge(START, "analyze")
graph.add_edge("analyze", "write")
graph.add_edge("write", END)
workflow = graph.compile()
# ========== 运行 ==========
async def main():
result = await workflow.ainvoke({
"input_text": "人工智能正在改变我们的生活...",
"analysis": None,
"article": None,
"messages": []
})
print(f"分析: {result['analysis']}")
print(f"文章: {result['article'].title}")优势:
- PydanticAI 提供类型安全的输出
- LangGraph 管理整体流程
- 两者优势完美结合
2.2 模式二:LangGraph 作为 PydanticAI 工具
反向集成:将 LangGraph 图作为 PydanticAI 的工具。
python
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
# ========== 定义 LangGraph 工作流 ==========
class ResearchState(TypedDict):
query: str
results: list[str]
def search_node(state: ResearchState) -> dict:
# 模拟搜索
return {"results": [f"Result for: {state['query']}"]}
def summarize_node(state: ResearchState) -> dict:
summary = "\n".join(state["results"])
return {"results": [f"Summary: {summary}"]}
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("summarize", summarize_node)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_workflow = research_graph.compile()
# ========== PydanticAI Agent 使用 LangGraph 作为工具 ==========
main_agent = Agent(
'openai:gpt-4o',
instructions='你是一个研究助手,可以使用研究工具来回答问题。'
)
@main_agent.tool_plain
async def research(query: str) -> str:
"""
执行研究任务
Args:
query: 研究主题
"""
result = await research_workflow.ainvoke({"query": query, "results": []})
return "\n".join(result["results"])
# 使用
async def main():
result = await main_agent.run("研究一下量子计算的最新进展")
print(result.output)2.3 模式三:共享状态集成
深度集成:共享状态和依赖。
python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Any
# ========== 共享依赖 ==========
@dataclass
class SharedDependencies:
database: Any # 数据库连接
cache: Any # 缓存
user_id: int # 当前用户
# ========== PydanticAI Agents ==========
query_agent = Agent(
'openai:gpt-4o',
deps_type=SharedDependencies,
instructions='你负责理解用户查询意图。'
)
@query_agent.tool
async def get_user_history(ctx: RunContext[SharedDependencies]) -> str:
"""获取用户历史"""
history = await ctx.deps.database.get_history(ctx.deps.user_id)
return str(history)
action_agent = Agent(
'openai:gpt-4o',
deps_type=SharedDependencies,
instructions='你负责执行具体操作。'
)
# ========== LangGraph 状态(包含共享依赖) ==========
class IntegratedState(TypedDict):
query: str
intent: str
result: str
deps: SharedDependencies # 共享依赖
async def understand_node(state: IntegratedState) -> dict:
"""理解意图"""
result = await query_agent.run(
state["query"],
deps=state["deps"]
)
return {"intent": result.output}
async def execute_node(state: IntegratedState) -> dict:
"""执行操作"""
result = await action_agent.run(
f"执行: {state['intent']}",
deps=state["deps"]
)
return {"result": result.output}
# 构建集成图
integrated_graph = StateGraph(IntegratedState)
integrated_graph.add_node("understand", understand_node)
integrated_graph.add_node("execute", execute_node)
integrated_graph.add_edge(START, "understand")
integrated_graph.add_edge("understand", "execute")
integrated_graph.add_edge("execute", END)
app = integrated_graph.compile()2.4 模式四:A2A 协议集成
标准化集成:通过 Agent-to-Agent 协议。
python
from pydantic_ai import Agent
from fasta2a import A2AClient
import httpx
# ========== PydanticAI Agent 作为 A2A 服务 ==========
specialist_agent = Agent(
'openai:gpt-4o',
instructions='你是领域专家。'
)
# 转换为 A2A 服务
a2a_app = specialist_agent.to_a2a()
# 运行: uvicorn a2a_server:a2a_app --port 8001
# ========== 在 LangGraph 中调用 A2A 服务 ==========
class A2AState(TypedDict):
query: str
expert_response: str
async def call_expert_node(state: A2AState) -> dict:
"""通过 A2A 协议调用专家"""
async with httpx.AsyncClient() as client:
a2a_client = A2AClient(
base_url="http://localhost:8001",
http_client=client
)
result = await a2a_client.send_task(
message=state["query"]
)
return {"expert_response": result.output}
# 构建使用 A2A 的图
a2a_graph = StateGraph(A2AState)
a2a_graph.add_node("call_expert", call_expert_node)
a2a_graph.add_edge(START, "call_expert")
a2a_graph.add_edge("call_expert", END)3. 完整集成示例:智能客服系统
3.1 系统架构
┌─────────────────────────────────────────────────────────────────┐
│ 智能客服系统架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户请求 │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LangGraph 编排层 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ │ │
│ │ │ 意图 │───►│ 路由 │───►│ 专家 │───►│ 输出│ │ │
│ │ │ 识别 │ │ 决策 │ │ 处理 │ │ 格式│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────┘ │ │
│ │ │ │ │ │ │
│ └────────┼──────────────┼──────────────┼────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PydanticAI Agent 层 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 意图 │ │ 订单 │ │ 技术 │ ... │ │
│ │ │ Agent │ │ Agent │ │ Agent │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 完整代码
python
"""
智能客服系统 - PydanticAI + LangGraph 协同示例
"""
from dataclasses import dataclass
from typing import TypedDict, Literal, Annotated
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# ============================================================
# 第一部分:共享依赖和数据模型
# ============================================================
@dataclass
class CustomerServiceDeps:
"""客服系统共享依赖"""
customer_id: str
db_connection: object # 数据库连接
order_api: object # 订单 API
async def get_customer_info(self) -> dict:
"""获取客户信息"""
# 模拟数据库查询
return {
"name": "张三",
"level": "VIP",
"history": ["订单查询", "退款申请"]
}
async def get_recent_orders(self) -> list:
"""获取最近订单"""
return [
{"id": "ORD001", "status": "已发货", "amount": 299},
{"id": "ORD002", "status": "待付款", "amount": 599},
]
# 意图分类结果
class IntentResult(BaseModel):
intent: Literal["order", "refund", "technical", "general"]
confidence: float = Field(ge=0, le=1)
entities: dict = Field(default_factory=dict)
# 订单查询结果
class OrderResponse(BaseModel):
found: bool
order_id: str | None = None
status: str | None = None
message: str
# 最终响应
class FinalResponse(BaseModel):
message: str
suggested_actions: list[str] = Field(default_factory=list)
satisfaction_prompt: bool = True
# ============================================================
# 第二部分:PydanticAI Agents 定义
# ============================================================
# 意图识别 Agent
intent_agent = Agent(
'openai:gpt-4o',
output_type=IntentResult,
deps_type=CustomerServiceDeps,
instructions='''
你是意图识别专家。分析用户消息,识别以下意图之一:
- order: 订单相关(查询、物流、状态)
- refund: 退款退货相关
- technical: 技术问题(产品使用、故障)
- general: 一般咨询
返回意图类型、置信度和提取的实体(如订单号)。
'''
)
@intent_agent.tool
async def get_customer_context(ctx: RunContext[CustomerServiceDeps]) -> str:
"""获取客户上下文信息"""
info = await ctx.deps.get_customer_info()
return f"客户: {info['name']}, 等级: {info['level']}, 历史: {info['history']}"
# 订单处理 Agent
order_agent = Agent(
'openai:gpt-4o',
output_type=OrderResponse,
deps_type=CustomerServiceDeps,
instructions='''
你是订单处理专家。处理用户的订单查询请求。
使用工具查询订单信息,给出准确回复。
'''
)
@order_agent.tool
async def query_orders(ctx: RunContext[CustomerServiceDeps]) -> str:
"""查询客户订单"""
orders = await ctx.deps.get_recent_orders()
return str(orders)
@order_agent.tool_plain
def format_order_status(order_id: str, status: str) -> str:
"""格式化订单状态"""
status_map = {
"已发货": "您的订单已发货,预计3-5天送达",
"待付款": "订单待付款,请尽快完成支付",
"已完成": "订单已完成,感谢您的购买",
}
return status_map.get(status, f"订单 {order_id} 状态: {status}")
# 退款处理 Agent
refund_agent = Agent(
'openai:gpt-4o',
deps_type=CustomerServiceDeps,
instructions='''
你是退款处理专家。帮助用户处理退款退货请求。
了解退款原因,说明退款政策,引导用户完成退款流程。
'''
)
# 技术支持 Agent
technical_agent = Agent(
'openai:gpt-4o',
instructions='''
你是技术支持专家。解答产品使用问题和技术故障。
提供清晰的步骤指导,必要时建议联系专业客服。
'''
)
# 通用咨询 Agent
general_agent = Agent(
'openai:gpt-4o',
instructions='''
你是通用咨询专家。回答一般性问题,介绍公司政策和服务。
保持友好专业的态度。
'''
)
# 响应格式化 Agent
formatter_agent = Agent(
'openai:gpt-4o',
output_type=FinalResponse,
instructions='''
你是响应格式化专家。将专家的回复整理成用户友好的格式。
- 保持简洁专业
- 添加相关的建议操作
- 适当询问满意度
'''
)
# ============================================================
# 第三部分:LangGraph 状态和节点
# ============================================================
class CustomerServiceState(TypedDict):
"""客服系统状态"""
messages: Annotated[list, add_messages]
user_input: str
intent: IntentResult | None
expert_response: str | None
final_response: FinalResponse | None
deps: CustomerServiceDeps
async def intent_node(state: CustomerServiceState) -> dict:
"""意图识别节点"""
result = await intent_agent.run(
state["user_input"],
deps=state["deps"]
)
return {
"intent": result.output,
"messages": [("system", f"识别意图: {result.output.intent}")]
}
async def order_node(state: CustomerServiceState) -> dict:
"""订单处理节点"""
result = await order_agent.run(
state["user_input"],
deps=state["deps"]
)
return {
"expert_response": result.output.message,
"messages": [("assistant", result.output.message)]
}
async def refund_node(state: CustomerServiceState) -> dict:
"""退款处理节点"""
result = await refund_agent.run(
state["user_input"],
deps=state["deps"]
)
return {
"expert_response": result.output,
"messages": [("assistant", result.output)]
}
async def technical_node(state: CustomerServiceState) -> dict:
"""技术支持节点"""
result = await technical_agent.run(state["user_input"])
return {
"expert_response": result.output,
"messages": [("assistant", result.output)]
}
async def general_node(state: CustomerServiceState) -> dict:
"""通用咨询节点"""
result = await general_agent.run(state["user_input"])
return {
"expert_response": result.output,
"messages": [("assistant", result.output)]
}
async def format_node(state: CustomerServiceState) -> dict:
"""响应格式化节点"""
result = await formatter_agent.run(
f"请格式化以下回复:\n{state['expert_response']}"
)
return {"final_response": result.output}
# 路由函数
def route_by_intent(state: CustomerServiceState) -> str:
"""根据意图路由到专家节点"""
if state["intent"] is None:
return "general"
intent = state["intent"].intent
route_map = {
"order": "order",
"refund": "refund",
"technical": "technical",
"general": "general",
}
return route_map.get(intent, "general")
# ============================================================
# 第四部分:构建工作流图
# ============================================================
def create_customer_service_graph():
"""创建客服工作流图"""
graph = StateGraph(CustomerServiceState)
# 添加节点
graph.add_node("intent", intent_node)
graph.add_node("order", order_node)
graph.add_node("refund", refund_node)
graph.add_node("technical", technical_node)
graph.add_node("general", general_node)
graph.add_node("format", format_node)
# 添加边
graph.add_edge(START, "intent")
# 条件路由
graph.add_conditional_edges(
"intent",
route_by_intent,
{
"order": "order",
"refund": "refund",
"technical": "technical",
"general": "general",
}
)
# 所有专家节点都连接到格式化节点
graph.add_edge("order", "format")
graph.add_edge("refund", "format")
graph.add_edge("technical", "format")
graph.add_edge("general", "format")
# 格式化后结束
graph.add_edge("format", END)
return graph.compile()
# ============================================================
# 第五部分:运行系统
# ============================================================
async def handle_customer_query(
query: str,
customer_id: str,
) -> FinalResponse:
"""处理客户查询"""
# 创建依赖
deps = CustomerServiceDeps(
customer_id=customer_id,
db_connection=None, # 实际项目中注入真实连接
order_api=None,
)
# 创建工作流
workflow = create_customer_service_graph()
# 执行
result = await workflow.ainvoke({
"user_input": query,
"intent": None,
"expert_response": None,
"final_response": None,
"deps": deps,
"messages": [],
})
return result["final_response"]
# 使用示例
async def main():
# 测试不同类型的查询
queries = [
"我想查一下我的订单状态",
"产品使用不了,一直报错",
"我想申请退款",
"你们公司在哪里?",
]
for query in queries:
print(f"\n用户: {query}")
response = await handle_customer_query(query, "CUST001")
print(f"客服: {response.message}")
print(f"建议: {response.suggested_actions}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())4. 最佳实践
4.1 设计原则
┌─────────────────────────────────────────────────────────────────┐
│ 协同设计原则 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 职责分离 │
│ ───────── │
│ PydanticAI: 专注单个任务的类型安全执行 │
│ LangGraph: 专注多任务的流程编排 │
│ │
│ 2. 状态管理 │
│ ───────── │
│ • 使用 LangGraph 管理全局状态 │
│ • 使用 PydanticAI deps 传递任务依赖 │
│ • 避免状态混乱 │
│ │
│ 3. 错误处理 │
│ ───────── │
│ • PydanticAI 处理验证和重试 │
│ • LangGraph 处理流程级错误 │
│ • 统一的错误日志 │
│ │
│ 4. 可观测性 │
│ ───────── │
│ • Logfire 追踪 PydanticAI │
│ • LangSmith 追踪 LangGraph │
│ • 或统一使用 OpenTelemetry │
│ │
└─────────────────────────────────────────────────────────────────┘4.2 代码组织
project/
├── agents/ # PydanticAI Agents
│ ├── __init__.py
│ ├── intent.py # 意图识别 Agent
│ ├── order.py # 订单处理 Agent
│ └── support.py # 技术支持 Agent
│
├── workflows/ # LangGraph 工作流
│ ├── __init__.py
│ ├── customer_service.py # 客服工作流
│ └── approval.py # 审批工作流
│
├── shared/ # 共享资源
│ ├── __init__.py
│ ├── dependencies.py # 共享依赖
│ ├── models.py # 共享数据模型
│ └── utils.py # 工具函数
│
├── tests/
│ ├── test_agents.py # Agent 测试
│ └── test_workflows.py # 工作流测试
│
└── main.py # 入口4.3 测试策略
python
import pytest
from pydantic_ai.models.test import TestModel
# 单独测试 PydanticAI Agent
@pytest.fixture
def test_intent_agent():
return Agent(
TestModel(custom_output_text='{"intent": "order", "confidence": 0.95, "entities": {}}'),
output_type=IntentResult,
)
def test_intent_recognition(test_intent_agent):
result = test_intent_agent.run_sync("查询订单")
assert result.output.intent == "order"
assert result.output.confidence > 0.9
# 测试集成工作流
@pytest.fixture
def mock_deps():
return CustomerServiceDeps(
customer_id="TEST001",
db_connection=MockDB(),
order_api=MockOrderAPI(),
)
async def test_workflow(mock_deps):
workflow = create_customer_service_graph()
# 使用测试模型覆盖
with intent_agent.override(model=TestModel()):
result = await workflow.ainvoke({
"user_input": "查询订单",
"deps": mock_deps,
# ...
})
assert result["final_response"] is not None5. 注意事项
5.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 依赖传递失败 | 状态类型不匹配 | 确保 deps 类型一致 |
| 类型丢失 | LangGraph 状态是 dict | 使用 TypedDict 保持类型 |
| 异步混乱 | 同步/异步混用 | 统一使用 async |
| 状态污染 | 节点修改共享状态 | 返回新状态,不要修改 |
5.2 性能优化
python
# 1. 并行执行独立 Agent
import asyncio
async def parallel_experts(state: State) -> dict:
# 并行调用多个专家
tasks = [
expert1.run(state["query"]),
expert2.run(state["query"]),
expert3.run(state["query"]),
]
results = await asyncio.gather(*tasks)
return {"expert_results": results}
# 2. 缓存 Agent 结果
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_analysis(text_hash: str):
# 缓存分析结果
pass
# 3. 流式处理
async def streaming_node(state: State) -> dict:
async with agent.run_stream(state["input"]) as stream:
chunks = []
async for chunk in stream.stream_text(delta=True):
chunks.append(chunk)
# 可以实时发送到前端
return {"response": "".join(chunks)}6. 小结
PydanticAI 和 LangGraph 的协同集成可以:
| 收益 | 说明 |
|---|---|
| 最佳开发体验 | PydanticAI 的类型安全 + 简洁 API |
| 最强编排能力 | LangGraph 的流程控制 + 状态管理 |
| 灵活架构 | 多种集成模式适应不同场景 |
| 可维护性 | 清晰的职责分离 |
推荐模式:
- 模式一(PydanticAI as Node)最常用
- 模式三(共享状态)最灵活
- 根据项目需求选择合适的模式
上一章:与 LangGraph 对比下一章:实践案例